package cn.smileyan.demos.core;

import cn.smileyan.demos.entity.CpuDataItem;
import cn.smileyan.demos.entity.TaskClusterData;
import cn.smileyan.demos.entity.TaskInput;
import cn.smileyan.demos.entity.ThresholdConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;


/**
 * 合并任务数据为集群数据
 * @author smileyan
 */
@Slf4j
public class TaskProcessingFunction extends ProcessWindowFunction<TaskInput, TaskClusterData, String, TimeWindow> {

    @Override
    public void process(String key,
                        ProcessWindowFunction<TaskInput, TaskClusterData, String, TimeWindow>.Context context,
                        Iterable<TaskInput> elements,
                        Collector<TaskClusterData> out) throws Exception {

        log.info("[{}] starting merge processing", key);

        final List<CpuDataItem> cpuDataItems = new LinkedList<>();
        Iterator<TaskInput> inputIterator = elements.iterator();
        TaskInput first = inputIterator.next();
        cpuDataItems.add(new CpuDataItem(first));
        String clusterId = first.getClusterId();
        String taskId = first.getTaskId();
        Integer clusterSize = first.getClusterSize();
        ThresholdConfig thresholdConfig = first.getThresholdConfig();

        while(inputIterator.hasNext()) {
            cpuDataItems.add(new CpuDataItem(inputIterator.next()));
        }

        log.info("[{}] finished merge processing", key);
        TaskClusterData clusterData = new TaskClusterData(taskId, clusterId, clusterSize, thresholdConfig, cpuDataItems);

        out.collect(clusterData);
    }
}

